package com.lambkit.dao.record;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import com.jfinal.kit.Kv;
import com.jfinal.kit.LogKit;
import com.jfinal.plugin.activerecord.*;
import com.jfinal.plugin.activerecord.cache.ICache;
import com.lambkit.LambkitConsts;
import com.lambkit.db.sql.Column;
import com.lambkit.db.sql.Columns;
import com.lambkit.db.sql.Example;
import com.lambkit.plugin.activerecord.dialect.LambkitDialect;

import java.sql.*;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.function.Function;

public abstract class BaseLambkitApiService<M extends LambkitRecord> implements LambkitApiService<M> {
	
	public abstract String getConfigName();
	public abstract String getTableName();
	public abstract String getPrimaryKeyName();
	public abstract Class<M> getLambkitRecordClass();
	public abstract Function<M, Boolean> getFunction(Function<M, Boolean> func);

	public Config getConfig() {
		String configName = getConfigName();
		Config config = null;
		if (configName != null) {
			config = DbKit.getConfig(configName);
		}
		if(config == null) {
			config = DbKit.getConfig();
			if (DbKit.getConfig() == null) {
				throw new RuntimeException("The main getConfig() is null, initialize ActiveRecordPlugin first");
			}
		}
		if (config == null) {
			throw new RuntimeException("The " + configName + " getConfig() is null, initialize ActiveRecordPlugin first");
		}
		return config;
	}

	public LambkitDialect getDialect() {
		return (LambkitDialect) getConfig().getDialect();
	}
	public String getPrimaryKey() {
		String primaryKey = getPrimaryKeyName();
		return StrUtil.isBlank(primaryKey) ? getConfig().getDialect().getDefaultPrimaryKey() : primaryKey;
	}

	public M findById(Object idValue) {
		return findById(getTableName(), idValue);
	}

	@Override
	public M findByPrimaryKey(Object id) {
		return findById(id);
	}

	@Override
	public M findByIds(Object... idValues) {
		return findByIds(getTableName(), getPrimaryKey(), idValues);
	}

	@Override
	public M findFirst(Example example) {
		SqlPara sqlPara = getDialect().forFindByExample(example, null);
		return findFirst(sqlPara);
	}

	@Override
	public M findFirst(Columns columns) {

		return findFirst(columns, null);
	}

	@Override
	public M findFirst(Columns columns, String orderby) {

		String sql = getDialect().forFindByColumns(getTableName(), "*", columns, orderby, 1);
		LinkedList<Object> params = new LinkedList<Object>();

		if (CollUtil.isNotEmpty(columns.getList())) {
			for (Column column : columns.getList()) {
				column.addValueToParam(params);
			}
		}
		return findFirst(sql, params.toArray());
	}

	@Override
	public List<M> find(Example example, Integer count) {

		SqlPara sqlPara = getDialect().forFindByExample(example, count);
		return find(sqlPara);
	}

	@Override
	public List<M> find(Columns columns, Integer count) {

		return find(columns, null, count);
	}

	@Override
	public List<M> find(Columns columns, String orderby, Integer count) {

		LinkedList<Object> params = new LinkedList<Object>();

		if (CollUtil.isNotEmpty(columns.getList())) {
			for (Column column : columns.getList()) {
				column.addValueToParam(params);
			}
		}

		String sql = getDialect().forFindByColumns(getTableName(), "*", columns, orderby, count);
		return params.isEmpty() ? find(sql) : find(sql, params.toArray());
	}

	@Override
	public List<M> find(Example example) {

		SqlPara sqlPara = getDialect().forFindByExample(example, null);
		return find(sqlPara);
	}

	@Override
	public List<M> find(Columns columns) {

		return find(columns, null, null);
	}

	@Override
	public List<M> find(Columns columns, String orderby) {

		return find(columns, orderby, null);
	}

	@Override
	public Page<M> paginate(Integer pageNumber, Integer pageSize, Example example) {

		SqlPara sqlPara = getDialect().forPaginateByExample(example);
		return paginate(pageNumber, pageSize, sqlPara);
	}

	@Override
	public Page<M> paginate(Integer pageNumber, Integer pageSize, Columns columns) {

		return paginate(pageNumber, pageSize, columns, null);
	}

	@Override
	public Page<M> paginate(Integer pageNumber, Integer pageSize, Columns columns, String orderby) {

		String selectPartSql = getDialect().forPaginateSelect("*");
		String fromPartSql = getDialect().forPaginateByColumns(getTableName(), columns, orderby);

		LinkedList<Object> params = new LinkedList<Object>();

		if (CollUtil.isNotEmpty(columns.getList())) {
			for (Column column : columns.getList()) {
				column.addValueToParam(params);
			}
		}
		return params.isEmpty() ? paginate(pageNumber, pageSize, selectPartSql, fromPartSql)
				: paginate(pageNumber, pageSize, selectPartSql, fromPartSql, params.toArray());
	}

	@Override
	public Page<M> paginate(Example example, Integer offset, Integer limit) {

		int pageSize = limit;
		int pageNumber = offset / pageSize + 1;
		return paginate(pageNumber, pageSize, example);
	}

	@Override
	public Page<M> paginate(Columns columns, Integer offset, Integer limit) {

		int pageSize = limit;
		int pageNumber = offset / pageSize + 1;
		return paginate(pageNumber, pageSize, columns);
	}


	@Override
	public Page<M> paginate(Columns columns, String orderby, Integer offset, Integer limit) {

		int pageSize = limit;
		int pageNumber = offset / pageSize + 1;
		return paginate(pageNumber, pageSize, columns, orderby);
	}

	public Long count(Example example) {
		example.setSelectSql(" count(*) ");
		SqlPara sqlPara = getDialect().forFindByExample(example, null);
		return Db.use(getConfig().getName()).queryLong(sqlPara.getSql(), sqlPara.getPara());
	}

	public Long count(Columns columns) {
		return count(Example.create(getTableName(), columns));
	}

	public boolean save(M lambkitRecord) {
		return save(getTableName(), getPrimaryKey(), lambkitRecord);
	}

	@Override
	public boolean delete(M lambkitRecord) {
		return delete(getTableName(), getPrimaryKey(), lambkitRecord);
	}

	public boolean deleteById(Object id) {
		return deleteById(getTableName(), getPrimaryKey(), id);
	}

	public boolean deleteByIds(Object ...ids) {
		return deleteByIds(getTableName(), getPrimaryKey(), ids);
	}

	@Override
	public int delete(Example example) {
		SqlPara sqlPara = getDialect().forDeleteByExample(example);
		return delete(sqlPara);
	}

	public int delete(SqlPara sqlPara) {
		return delete(sqlPara.getSql(), sqlPara.getPara());
	}

	@Override
	public int delete(Columns columns) {
		return delete(Example.create(getTableName(), columns));
	}

	@Override
	public boolean update(M lambkitRecord) {
		return update(getTableName(), getPrimaryKey(), lambkitRecord);
	}

	@Override
	public int update(M lambkitRecord, Example example) {
		SqlPara sqlPara = getDialect().forUpdateByExample(lambkitRecord, example);
		return update(sqlPara);
	}

	@Override
	public int update(M lambkitRecord, Columns columns) {
		return update(lambkitRecord, Example.create(getTableName(), columns));
	}

	@Override
	public int[] batchSave(List<? extends M> recordList, int batchSize) {
		return batchSave(getTableName(), recordList, batchSize);
	}

	@Override
	public int[] batchUpdate(List<? extends M> recordList, int batchSize) {
		return batchUpdate(getTableName(), getPrimaryKey(), recordList, batchSize);
	}

	public List<M> findAll() {
		return findAll(getTableName());
	}
	//---------------------------------db---------------------------------------------------------------

	/**
	 * Execute sql update
	 */
	protected int update(Config config, Connection conn, String sql, Object... paras) throws SQLException {
		try (PreparedStatement pst = conn.prepareStatement(sql)) {
			config.getDialect().fillStatement(pst, paras);
			int result = pst.executeUpdate();
			return result;
		}
	}

	/**
	 * Execute update, insert or delete sql statement.
	 * @param sql an SQL statement that may contain one or more '?' IN parameter placeholders
	 * @param paras the parameters of sql
	 * @return either the row count for <code>INSERT</code>, <code>UPDATE</code>,
	 *         or <code>DELETE</code> statements, or 0 for SQL statements
	 *         that return nothing
	 */
	public int update(String sql, Object... paras) {
		Connection conn = null;
		try {
			conn = getConfig().getConnection();
			return update(getConfig(), conn, sql, paras);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			getConfig().close(conn);
		}
	}

	/**
	 * @see #update(String, Object...)
	 * @param sql an SQL statement
	 */
	public int update(String sql) {
		return update(sql, LambkitConsts.NULL_PARA_ARRAY);
	}

	protected List<M> find(Config config, Connection conn, String sql, Object... paras) throws SQLException, InstantiationException, IllegalAccessException {
		try (PreparedStatement pst = conn.prepareStatement(sql)) {
			config.getDialect().fillStatement(pst, paras);
			ResultSet rs = pst.executeQuery();
			List<M> result = LambkitPojoBuilder.me.build(config, rs, getLambkitRecordClass(), getFunction(null));
			//config.getDialect().buildRecordList(config, rs);
			// RecordBuilder.build(config, rs);
			close(rs);
			return result;
		}
	}

	/**
	 * @see #find(String, Object...)
	 */
	public List<M> find(String sql, Object... paras) {
		Connection conn = null;
		try {
			conn = getConfig().getConnection();
			return find(getConfig(), conn, sql, paras);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			getConfig().close(conn);
		}
	}

	/**
	 * @see #find(String, Object...)
	 * @param sql the sql statement
	 */
	public List<M> find(String sql) {
		return find(sql, LambkitConsts.NULL_PARA_ARRAY);
	}

	public List<M> findAll(String tableName) {
		String sql = getConfig().getDialect().forFindAll(tableName);
		return find(sql, LambkitConsts.NULL_PARA_ARRAY);
	}

	/**
	 * Find first lambkitRecord. I recommend add "limit 1" in your sql.
	 * @param sql an SQL statement that may contain one or more '?' IN parameter placeholders
	 * @param paras the parameters of sql
	 * @return the M object
	 */
	public M findFirst(String sql, Object... paras) {
		List<M> result = find(sql, paras);
		return result.size() > 0 ? result.get(0) : null;
	}

	/**
	 * @see #findFirst(String, Object...)
	 * @param sql an SQL statement
	 */
	public M findFirst(String sql) {
		return findFirst(sql, LambkitConsts.NULL_PARA_ARRAY);
	}

	/**
	 * Find lambkitRecord by id with default primary key.
	 * <pre>
	 * Example:
	 * M user = Db.use().findById("user", 15);
	 * </pre>
	 * @param tableName the table name of the table
	 * @param idValue the id value of the lambkitRecord
	 */
	public M findById(String tableName, Object idValue) {
		return findByIds(tableName, getPrimaryKey(), idValue);
	}

	public M findById(String tableName, String primaryKey, Object idValue) {
		return findByIds(tableName, primaryKey, idValue);
	}

	/**
	 * Find lambkitRecord by ids.
	 * <pre>
	 * Example:
	 * M user = Db.use().findByIds("user", "user_id", 123);
	 * M userRole = Db.use().findByIds("user_role", "user_id, role_id", 123, 456);
	 * </pre>
	 * @param tableName the table name of the table
	 * @param primaryKey the primary key of the table, composite primary key is separated by comma character: ","
	 * @param idValues the id value of the lambkitRecord, it can be composite id values
	 */
	public M findByIds(String tableName, String primaryKey, Object... idValues) {
		String[] pKeys = primaryKey.split(",");
		if (pKeys.length != idValues.length) {
			throw new IllegalArgumentException("primary key number must equals id value number");
		}

		String sql = getConfig().getDialect().forDbFindById(tableName, pKeys);
		List<M> result = find(sql, idValues);
		return result.size() > 0 ? result.get(0) : null;
	}

	/**
	 * Delete lambkitRecord by id with default primary key.
	 * <pre>
	 * Example:
	 * Db.use().deleteById("user", 15);
	 * </pre>
	 * @param tableName the table name of the table
	 * @param idValue the id value of the lambkitRecord
	 * @return true if delete succeed otherwise false
	 */
	public boolean deleteById(String tableName, Object idValue) {
		return deleteByIds(tableName, getPrimaryKey(), idValue);
	}

	public boolean deleteById(String tableName, String primaryKey, Object idValue) {
		return deleteByIds(tableName, primaryKey, idValue);
	}

	/**
	 * Delete lambkitRecord by ids.
	 * <pre>
	 * Example:
	 * Db.use().deleteByIds("user", "user_id", 15);
	 * Db.use().deleteByIds("user_role", "user_id, role_id", 123, 456);
	 * </pre>
	 * @param tableName the table name of the table
	 * @param primaryKey the primary key of the table, composite primary key is separated by comma character: ","
	 * @param idValues the id value of the lambkitRecord, it can be composite id values
	 * @return true if delete succeed otherwise false
	 */
	public boolean deleteByIds(String tableName, String primaryKey, Object... idValues) {
		String[] pKeys = primaryKey.split(",");
		if (pKeys.length != idValues.length) {
			throw new IllegalArgumentException("primary key number must equals id value number");
		}

		String sql = getConfig().getDialect().forDbDeleteById(tableName, pKeys);
		boolean flag = update(sql, idValues) >= 1;
		if(flag) {
			Kv data = Kv.by("tableName", tableName).set("primaryKey", primaryKey).set("idValues", idValues);
			data.set("sql", sql);
			//EventKit.sendEvent(getTableName()+":delete", data);
		}
		return flag;
	}

	/**
	 * Delete lambkitRecord.
	 * <pre>
	 * Example:
	 * boolean succeed = Db.use().delete("user", "id", user);
	 * </pre>
	 * @param tableName the table name of the table
	 * @param primaryKey the primary key of the table, composite primary key is separated by comma character: ","
	 * @param lambkitRecord the lambkitRecord
	 * @return true if delete succeed otherwise false
	 */
	public boolean delete(String tableName, String primaryKey, M lambkitRecord) {
		String[] pKeys = primaryKey.split(",");
		if (pKeys.length <= 1) {
			Object t = lambkitRecord.get(primaryKey);	// 引入中间变量避免 JDK 8 传参有误
			return deleteByIds(tableName, primaryKey, t);
		}

		getConfig().getDialect().trimPrimaryKeys(pKeys);
		Object[] idValue = new Object[pKeys.length];
		for (int i=0; i<pKeys.length; i++) {
			idValue[i] = lambkitRecord.get(pKeys[i]);
			if (idValue[i] == null) {
				throw new IllegalArgumentException("The value of primary key \"" + pKeys[i] + "\" can not be null in lambkitRecord object");
			}
		}
		return deleteByIds(tableName, primaryKey, idValue);
	}

	/**
	 * <pre>
	 * Example:
	 * boolean succeed = Db.use().delete("user", user);
	 * </pre>
	 * @see #delete(String, String, M)
	 */
	public boolean delete(String tableName, M lambkitRecord) {
		String defaultPrimaryKey = getPrimaryKey();
		Object t = lambkitRecord.get(defaultPrimaryKey);	// 引入中间变量避免 JDK 8 传参有误
		return deleteByIds(tableName, defaultPrimaryKey, t);
	}

	/**
	 * Execute delete sql statement.
	 * @param sql an SQL statement that may contain one or more '?' IN parameter placeholders
	 * @param paras the parameters of sql
	 * @return the row count for <code>DELETE</code> statements, or 0 for SQL statements
	 *         that return nothing
	 */
	public int delete(String sql, Object... paras) {
		return update(sql, paras);
	}

	/**
	 * @see #delete(String, Object...)
	 * @param sql an SQL statement
	 */
	public int delete(String sql) {
		return update(sql);
	}

	protected <T> List<T> query(Config config, Connection conn, String sql, Object... paras) throws SQLException {
		List result = new ArrayList();
		try (PreparedStatement pst = conn.prepareStatement(sql)) {
			config.getDialect().fillStatement(pst, paras);
			ResultSet rs = pst.executeQuery();
			int colAmount = rs.getMetaData().getColumnCount();
			if (colAmount > 1) {
				while (rs.next()) {
					Object[] temp = new Object[colAmount];
					for (int i=0; i<colAmount; i++) {
						temp[i] = rs.getObject(i + 1);
					}
					result.add(temp);
				}
			}
			else if(colAmount == 1) {
				while (rs.next()) {
					result.add(rs.getObject(1));
				}
			}
			close(rs);
			return result;
		}
	}

	/**
	 * Paginate.
	 * @param pageNumber the page number
	 * @param pageSize the page size
	 * @param select the select part of the sql statement
	 * @param sqlExceptSelect the sql statement excluded select part
	 * @param paras the parameters of sql
	 * @return the Page object
	 */
	public Page<M> paginate(int pageNumber, int pageSize, String select, String sqlExceptSelect, Object... paras) {
		return doPaginate(pageNumber, pageSize, null, select, sqlExceptSelect, paras);
	}

	/**
	 * @see #paginate(int, int, String, String, Object...)
	 */
	public Page<M> paginate(int pageNumber, int pageSize, String select, String sqlExceptSelect) {
		return doPaginate(pageNumber, pageSize, null, select, sqlExceptSelect, LambkitConsts.NULL_PARA_ARRAY);
	}

	public Page<M> paginate(int pageNumber, int pageSize, boolean isGroupBySql, String select, String sqlExceptSelect, Object... paras) {
		return doPaginate(pageNumber, pageSize, isGroupBySql, select, sqlExceptSelect, paras);
	}

	protected Page<M> doPaginate(int pageNumber, int pageSize, Boolean isGroupBySql, String select, String sqlExceptSelect, Object... paras) {
		Connection conn = null;
		try {
			conn = getConfig().getConnection();
			String totalRowSql = getConfig().getDialect().forPaginateTotalRow(select, sqlExceptSelect, null);
			StringBuilder findSql = new StringBuilder();
			findSql.append(select).append(' ').append(sqlExceptSelect);
			return doPaginateByFullSql(getConfig(), conn, pageNumber, pageSize, isGroupBySql, totalRowSql, findSql, paras);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			getConfig().close(conn);
		}
	}

	protected Page<M> doPaginateByFullSql(Config config, Connection conn, int pageNumber, int pageSize, Boolean isGroupBySql, String totalRowSql, StringBuilder findSql, Object... paras) throws SQLException, InstantiationException, IllegalAccessException {
		if (pageNumber < 1 || pageSize < 1) {
			throw new ActiveRecordException("pageNumber and pageSize must more than 0");
		}
		if (config.getDialect().isTakeOverDbPaginate()) {
			//return getConfig().getDialect().takeOverDbPaginate(conn, pageNumber, pageSize, isGroupBySql, totalRowSql, findSql, paras);
			throw new RuntimeException("You should implements this method in " + config.getDialect().getClass().getName());
		}

		List result = query(config, conn, totalRowSql, paras);
		int size = result.size();
		if (isGroupBySql == null) {
			isGroupBySql = size > 1;
		}

		long totalRow;
		if (isGroupBySql) {
			totalRow = size;
		} else {
			totalRow = (size > 0) ? ((Number)result.get(0)).longValue() : 0;
		}
		if (totalRow == 0) {
			return new Page<M>(new ArrayList<M>(0), pageNumber, pageSize, 0, 0);
		}

		int totalPage = (int) (totalRow / pageSize);
		if (totalRow % pageSize != 0) {
			totalPage++;
		}

		if (pageNumber > totalPage) {
			return new Page<M>(new ArrayList<M>(0), pageNumber, pageSize, totalPage, (int)totalRow);
		}

		// --------
		String sql = config.getDialect().forPaginate(pageNumber, pageSize, findSql);
		List<M> list = find(config, conn, sql, paras);
		return new Page<M>(list, pageNumber, pageSize, totalPage, (int)totalRow);
	}

	protected Page<M> paginate(Config config, Connection conn, int pageNumber, int pageSize, String select, String sqlExceptSelect, Object... paras) throws SQLException, InstantiationException, IllegalAccessException {
		String totalRowSql = config.getDialect().forPaginateTotalRow(select, sqlExceptSelect, null);
		StringBuilder findSql = new StringBuilder();
		findSql.append(select).append(' ').append(sqlExceptSelect);
		return doPaginateByFullSql(config, conn, pageNumber, pageSize, null, totalRowSql, findSql, paras);
	}

	protected Page<M> doPaginateByFullSql(int pageNumber, int pageSize, Boolean isGroupBySql, String totalRowSql, String findSql, Object... paras) {
		Connection conn = null;
		try {
			conn = getConfig().getConnection();
			StringBuilder findSqlBuf = new StringBuilder().append(findSql);
			return doPaginateByFullSql(getConfig(), conn, pageNumber, pageSize, isGroupBySql, totalRowSql, findSqlBuf, paras);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			getConfig().close(conn);
		}
	}

	public Page<M> paginateByFullSql(int pageNumber, int pageSize, String totalRowSql, String findSql, Object... paras) {
		return doPaginateByFullSql(pageNumber, pageSize, null, totalRowSql, findSql, paras);
	}

	public Page<M> paginateByFullSql(int pageNumber, int pageSize, boolean isGroupBySql, String totalRowSql, String findSql, Object... paras) {
		return doPaginateByFullSql(pageNumber, pageSize, isGroupBySql, totalRowSql, findSql, paras);
	}

	protected boolean save(Config config, Connection conn, String tableName, String primaryKey, LambkitRecord lambkitRecord) throws SQLException {
		String[] pKeys = primaryKey.split(",");
		List<Object> paras = new ArrayList<Object>();
		StringBuilder sql = new StringBuilder();

		config.getDialect().forDbSave(tableName, pKeys, lambkitRecord, sql, paras);

		try (PreparedStatement pst =
					 config.getDialect().isOracle() ?
							 conn.prepareStatement(sql.toString(), pKeys) :
							 conn.prepareStatement(sql.toString(), Statement.RETURN_GENERATED_KEYS)) {
			config.getDialect().fillStatement(pst, paras);
			int result = pst.executeUpdate();
			config.getDialect().getRecordGeneratedKey(pst, lambkitRecord, pKeys);
			clearModifyFlag(lambkitRecord);
			return result >= 1;
		}
	}

	/**
	 * Save lambkitRecord.
	 * <pre>
	 * Example:
	 * M userRole = new M().set("user_id", 123).set("role_id", 456);
	 * Db.use().save("user_role", "user_id, role_id", userRole);
	 * </pre>
	 * @param tableName the table name of the table
	 * @param primaryKey the primary key of the table, composite primary key is separated by comma character: ","
	 * @param lambkitRecord the lambkitRecord will be saved
	 */
	protected boolean save(String tableName, String primaryKey, LambkitRecord lambkitRecord) {
		Connection conn = null;
		try {
			conn = getConfig().getConnection();
			boolean flag = save(getConfig(), conn, tableName, primaryKey, lambkitRecord);
			if(flag) {
				//EventKit.sendEvent(getTableName()+":add", lambkitRecord);
			}
			return flag;
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			getConfig().close(conn);
		}
	}

	protected boolean save(String tableName, LambkitRecord lambkitRecord) {
		return save(tableName, getPrimaryKey(), lambkitRecord);
	}

	protected boolean update(Config config, Connection conn, String tableName, String primaryKey, LambkitRecord lambkitRecord) throws SQLException {
		if (modifyFlagInvalid(lambkitRecord)) {
			return false;
		}

		String[] pKeys = primaryKey.split(",");
		Object[] ids = new Object[pKeys.length];

		for (int i=0; i<pKeys.length; i++) {
			ids[i] = lambkitRecord.get(pKeys[i].trim());	// .trim() is important!
			if (ids[i] == null) {
				throw new ActiveRecordException("You can't update lambkitRecord without Primary Key, " + pKeys[i] + " can not be null.");
			}
		}

		StringBuilder sql = new StringBuilder();
		List<Object> paras = new ArrayList<Object>();
		config.getDialect().forDbUpdate(tableName, pKeys, ids, lambkitRecord, sql, paras);

		if (paras.size() <= 1) {	// 参数个数为 1 的情况表明只有主键，也无需更新
			return false;
		}

		int result = update(config, conn, sql.toString(), paras.toArray());
		if (result >= 1) {
			clearModifyFlag(lambkitRecord);
			//lambkitRecord.clearModifyFlag();
			Kv data = Kv.by("tableName", tableName).set("primaryKey", primaryKey).set("data", lambkitRecord);
			//EventKit.sendEvent(getTableName()+":update", data);
			return true;
		}
		return false;
	}

	public boolean modifyFlagInvalid(LambkitRecord lambkitRecord) {
		if (CPI.getModifyFlag(lambkitRecord) == null || CPI.getModifyFlag(lambkitRecord).isEmpty()) {
			return true;
		}
		return false;
	}

	public boolean modifyFlagInvalid(Model model) {
		if (CPI.getModifyFlag(model) == null || CPI.getModifyFlag(model).isEmpty()) {
			return true;
		}
		return false;
	}

	public void clearModifyFlag(LambkitRecord lambkitRecord) {
		if(CPI.getModifyFlag(lambkitRecord)!=null) {
			CPI.getModifyFlag(lambkitRecord).clear();
		}
	}

	/**
	 * Update M.
	 * <pre>
	 * Example:
	 * Db.use().update("user_role", "user_id, role_id", lambkitRecord);
	 * </pre>
	 * @param tableName the table name of the M save to
	 * @param primaryKey the primary key of the table, composite primary key is separated by comma character: ","
	 * @param lambkitRecord the M object
	 */
	protected boolean update(String tableName, String primaryKey, LambkitRecord lambkitRecord) {
		Connection conn = null;
		try {
			conn = getConfig().getConnection();
			return update(getConfig(), conn, tableName, primaryKey, lambkitRecord);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			getConfig().close(conn);
		}
	}

	/**
	 * Update lambkitRecord with default primary key.
	 * <pre>
	 * Example:
	 * Db.use().update("user", lambkitRecord);
	 * </pre>
	 * @see #update(String, String, M)
	 */
	protected boolean update(String tableName, LambkitRecord lambkitRecord) {
		return update(tableName, getPrimaryKey(), lambkitRecord);
	}

	/**
	 * @see #execute(ICallback)
	 */
	public Object execute(ICallback callback) {
		return execute(getConfig(), callback);
	}

	/**
	 * Execute callback. It is useful when all the API can not satisfy your requirement.
	 * @param config the Config object
	 * @param callback the ICallback interface
	 */
	protected Object execute(Config config, ICallback callback) {
		Connection conn = null;
		try {
			conn = config.getConnection();
			return callback.call(conn);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			config.close(conn);
		}
	}

	/**
	 * Execute transaction.
	 * @param config the Config object
	 * @param transactionLevel the transaction level
	 * @param atom the atom operation
	 * @return true if transaction executing succeed otherwise false
	 */
	protected boolean tx(Config config, int transactionLevel, IAtom atom) {
		Connection conn = config.getThreadLocalConnection();
		if (conn != null) {	// Nested transaction support
			try {
				if (conn.getTransactionIsolation() < transactionLevel) {
					conn.setTransactionIsolation(transactionLevel);
				}
				boolean result = atom.run();
				if (result) {
					return true;
				}
				throw new NestedTransactionHelpException("Notice the outer transaction that the nested transaction return false");	// important:can not return false
			}
			catch (SQLException e) {
				throw new ActiveRecordException(e);
			}
		}

		Boolean autoCommit = null;
		try {
			conn = config.getConnection();
			autoCommit = conn.getAutoCommit();
			config.setThreadLocalConnection(conn);
			conn.setTransactionIsolation(transactionLevel);
			conn.setAutoCommit(false);
			boolean result = atom.run();
			if (result) {
				conn.commit();
			} else {
				conn.rollback();
			}
			return result;
		} catch (NestedTransactionHelpException e) {
			if (conn != null) {
				try {conn.rollback();} catch (Exception e1) {
					LogKit.error(e1.getMessage(), e1);}
			}
			LogKit.logNothing(e);
			return false;
		} catch (Throwable t) {
			if (conn != null) {
				try {conn.rollback();} catch (Exception e1) {LogKit.error(e1.getMessage(), e1);}
			}
			throw t instanceof RuntimeException ? (RuntimeException)t : new ActiveRecordException(t);
		} finally {
			try {
				if (conn != null) {
					if (autoCommit != null) {
						conn.setAutoCommit(autoCommit);
					}
					conn.close();
				}
			} catch (Throwable t) {
				LogKit.error(t.getMessage(), t);	// can not throw exception here, otherwise the more important exception in previous catch block can not be thrown
			} finally {
				config.removeThreadLocalConnection();	// prevent memory leak
			}
		}
	}

	/**
	 * Execute transaction with default transaction level.
	 * @see #tx(int, IAtom)
	 */
	public boolean tx(IAtom atom) {
		return tx(getConfig(), getConfig().getTransactionLevel(), atom);
	}

	public boolean tx(int transactionLevel, IAtom atom) {
		return tx(getConfig(), transactionLevel, atom);
	}

	/**
	 * 主要用于嵌套事务场景
	 *
	 * 实例：https://jfinal.com/feedback/4008
	 *
	 * 默认情况下嵌套事务会被合并成为一个事务，那么内层与外层任何地方回滚事务
	 * 所有嵌套层都将回滚事务，也就是说嵌套事务无法独立提交与回滚
	 *
	 * 使用 txInNewThread(...) 方法可以实现层之间的事务控制的独立性
	 * 由于事务处理是将 Connection 绑定到线程上的，所以 txInNewThread(...)
	 * 通过建立新线程来实现嵌套事务的独立控制
	 */
	public Future<Boolean> txInNewThread(IAtom atom) {
		FutureTask<Boolean> task = new FutureTask<>(() -> tx(getConfig(), getConfig().getTransactionLevel(), atom));
		Thread thread = new Thread(task);
		thread.setDaemon(true);
		thread.start();
		return task;
	}

	public Future<Boolean> txInNewThread(int transactionLevel, IAtom atom) {
		FutureTask<Boolean> task = new FutureTask<>(() -> tx(getConfig(), transactionLevel, atom));
		Thread thread = new Thread(task);
		thread.setDaemon(true);
		thread.start();
		return task;
	}

	/**
	 * Find M by cache.
	 * @see #find(String, Object...)
	 * @param cacheName the cache name
	 * @param key the key used to get date from cache
	 * @return the list of M
	 */
	public List<M> findByCache(String cacheName, Object key, String sql, Object... paras) {
		ICache cache = getConfig().getCache();
		List<M> result = cache.get(cacheName, key);
		if (result == null) {
			result = find(sql, paras);
			cache.put(cacheName, key, result);
		}
		return result;
	}

	/**
	 * @see #findByCache(String, Object, String, Object...)
	 */
	public List<M> findByCache(String cacheName, Object key, String sql) {
		return findByCache(cacheName, key, sql, LambkitConsts.NULL_PARA_ARRAY);
	}

	/**
	 * Find first lambkitRecord by cache. I recommend add "limit 1" in your sql.
	 * @see #findFirst(String, Object...)
	 * @param cacheName the cache name
	 * @param key the key used to get date from cache
	 * @param sql an SQL statement that may contain one or more '?' IN parameter placeholders
	 * @param paras the parameters of sql
	 * @return the M object
	 */
	public M findFirstByCache(String cacheName, Object key, String sql, Object... paras) {
		ICache cache = getConfig().getCache();
		M result = cache.get(cacheName, key);
		if (result == null) {
			result = findFirst(sql, paras);
			cache.put(cacheName, key, result);
		}
		return result;
	}

	/**
	 * @see #findFirstByCache(String, Object, String, Object...)
	 */
	public M findFirstByCache(String cacheName, Object key, String sql) {
		return findFirstByCache(cacheName, key, sql, LambkitConsts.NULL_PARA_ARRAY);
	}

	/**
	 * Paginate by cache.
	 * @see #paginate(int, int, String, String, Object...)
	 * @return Page
	 */
	public Page<M> paginateByCache(String cacheName, Object key, int pageNumber, int pageSize, String select, String sqlExceptSelect, Object... paras) {
		return doPaginateByCache(cacheName, key, pageNumber, pageSize, null, select, sqlExceptSelect, paras);
	}

	/**
	 * @see #paginateByCache(String, Object, int, int, String, String, Object...)
	 */
	public Page<M> paginateByCache(String cacheName, Object key, int pageNumber, int pageSize, String select, String sqlExceptSelect) {
		return doPaginateByCache(cacheName, key, pageNumber, pageSize, null, select, sqlExceptSelect, LambkitConsts.NULL_PARA_ARRAY);
	}

	public Page<M> paginateByCache(String cacheName, Object key, int pageNumber, int pageSize, boolean isGroupBySql, String select, String sqlExceptSelect, Object... paras) {
		return doPaginateByCache(cacheName, key, pageNumber, pageSize, isGroupBySql, select, sqlExceptSelect, paras);
	}

	protected Page<M> doPaginateByCache(String cacheName, Object key, int pageNumber, int pageSize, Boolean isGroupBySql, String select, String sqlExceptSelect, Object... paras) {
		ICache cache = getConfig().getCache();
		Page<M> result = cache.get(cacheName, key);
		if (result == null) {
			result = doPaginate(pageNumber, pageSize, isGroupBySql, select, sqlExceptSelect, paras);
			cache.put(cacheName, key, result);
		}
		return result;
	}

	protected int[] batch(Config config, Connection conn, String sql, Object[][] paras, int batchSize) throws SQLException {
		if (paras == null || paras.length == 0) {
			return new int[0];
		}
		if (batchSize < 1) {
			throw new IllegalArgumentException("The batchSize must more than 0.");
		}

		boolean isInTransaction = getConfig().isInTransaction();
		int counter = 0;
		int pointer = 0;
		int[] result = new int[paras.length];
		try (PreparedStatement pst = conn.prepareStatement(sql)) {
			for (int i=0; i<paras.length; i++) {
				for (int j=0; j<paras[i].length; j++) {
					Object value = paras[i][j];
					if (value instanceof java.util.Date) {
						if (value instanceof java.sql.Date) {
							pst.setDate(j + 1, (java.sql.Date)value);
						} else if (value instanceof Timestamp) {
							pst.setTimestamp(j + 1, (Timestamp)value);
						} else {
							// Oracle、SqlServer 中的 TIMESTAMP、DATE 支持 new Date() 给值
							java.util.Date d = (java.util.Date)value;
							pst.setTimestamp(j + 1, new Timestamp(d.getTime()));
						}
					}
					else {
						pst.setObject(j + 1, value);
					}
				}
				pst.addBatch();
				if (++counter >= batchSize) {
					counter = 0;
					int[] r = pst.executeBatch();
					if (isInTransaction == false) {
						conn.commit();
					}
					for (int k=0; k<r.length; k++) {
						result[pointer++] = r[k];
					}
				}
			}
			if (counter != 0) {
				int[] r = pst.executeBatch();
				if (isInTransaction == false) {
					conn.commit();
				}
				for (int k = 0; k < r.length; k++) {
					result[pointer++] = r[k];
				}
			}

			return result;
		}
	}

	/**
	 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
	 * <pre>
	 * Example:
	 * String sql = "insert into user(name, cash) values(?, ?)";
	 * int[] result = Db.use().batch(sql, new Object[][]{{"James", 888}, {"zhanjin", 888}});
	 * </pre>
	 * @param sql The SQL to execute.
	 * @param paras An array of query replacement parameters.  Each row in this array is one set of batch replacement values.
	 * @return The number of rows updated per statement
	 */
	public int[] batch(String sql, Object[][] paras, int batchSize) {
		Connection conn = null;
		Boolean autoCommit = null;
		try {
			conn = getConfig().getConnection();
			autoCommit = conn.getAutoCommit();
			conn.setAutoCommit(false);
			return batch(getConfig(), conn, sql, paras, batchSize);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			if (autoCommit != null) {
				try {
					conn.setAutoCommit(autoCommit);
				} catch (Exception e) {
					LogKit.error(e.getMessage(), e);
				}
			}
			getConfig().close(conn);
		}
	}

	protected int[] batch(Config config, Connection conn, String sql, String columns, List list, int batchSize) throws SQLException {
		if (list == null || list.size() == 0) {
			return new int[0];
		}
		Object element = list.get(0);
		if (!(element instanceof LambkitRecord) && !(element instanceof Model)) {
			throw new IllegalArgumentException("The element in list must be Model or M.");
		}
		if (batchSize < 1) {
			throw new IllegalArgumentException("The batchSize must more than 0.");
		}
		boolean isModel = element instanceof Model;

		String[] columnArray = columns.split(",");
		for (int i=0; i<columnArray.length; i++) {
			columnArray[i] = columnArray[i].trim();
		}

		boolean isInTransaction = config.isInTransaction();
		int counter = 0;
		int pointer = 0;
		int size = list.size();
		int[] result = new int[size];
		try (PreparedStatement pst = conn.prepareStatement(sql)) {
			for (int i=0; i<size; i++) {
				//Map map = isModel ? ((Model)list.get(i))._getAttrs() : ((M)list.get(i)).getColumns();
				Map map = isModel ? CPI.getAttrs(((Model)list.get(i))) : ((M)list.get(i)).getColumns();
				for (int j=0; j<columnArray.length; j++) {
					Object value = map.get(columnArray[j]);
					if (value instanceof java.util.Date) {
						if (value instanceof java.sql.Date) {
							pst.setDate(j + 1, (java.sql.Date)value);
						} else if (value instanceof Timestamp) {
							pst.setTimestamp(j + 1, (Timestamp)value);
						} else {
							// Oracle、SqlServer 中的 TIMESTAMP、DATE 支持 new Date() 给值
							java.util.Date d = (java.util.Date)value;
							pst.setTimestamp(j + 1, new Timestamp(d.getTime()));
						}
					}
					else {
						pst.setObject(j + 1, value);
					}
				}
				pst.addBatch();
				if (++counter >= batchSize) {
					counter = 0;
					int[] r = pst.executeBatch();
					if (isInTransaction == false) {
						conn.commit();
					}
					for (int k=0; k<r.length; k++) {
						result[pointer++] = r[k];
					}
				}
			}
			if (counter != 0) {
				int[] r = pst.executeBatch();
				if (isInTransaction == false) {
					conn.commit();
				}
				for (int k = 0; k < r.length; k++) {
					result[pointer++] = r[k];
				}
			}

			return result;
		}
	}

	/**
	 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
	 * <pre>
	 * Example:
	 * String sql = "insert into user(name, cash) values(?, ?)";
	 * int[] result = Db.use().batch(sql, "name, cash", modelList, 500);
	 * </pre>
	 * @param sql The SQL to execute.
	 * @param columns the columns need be processed by sql.
	 * @param modelOrRecordList model or lambkitRecord object list.
	 * @param batchSize batch size.
	 * @return The number of rows updated per statement
	 */
	public int[] batch(String sql, String columns, List modelOrRecordList, int batchSize) {
		Connection conn = null;
		Boolean autoCommit = null;
		try {
			conn = getConfig().getConnection();
			autoCommit = conn.getAutoCommit();
			conn.setAutoCommit(false);
			return batch(getConfig(), conn, sql, columns, modelOrRecordList, batchSize);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			if (autoCommit != null) {
				try {
					conn.setAutoCommit(autoCommit);
				} catch (Exception e) {
					LogKit.error(e.getMessage(), e);
				}
			}
			getConfig().close(conn);
		}
	}

	protected int[] batch(Config config, Connection conn, List<String> sqlList, int batchSize) throws SQLException {
		if (sqlList == null || sqlList.size() == 0) {
			return new int[0];
		}
		if (batchSize < 1) {
			throw new IllegalArgumentException("The batchSize must more than 0.");
		}

		boolean isInTransaction = config.isInTransaction();
		int counter = 0;
		int pointer = 0;
		int size = sqlList.size();
		int[] result = new int[size];
		try (Statement st = conn.createStatement()) {
			for (int i=0; i<size; i++) {
				st.addBatch(sqlList.get(i));
				if (++counter >= batchSize) {
					counter = 0;
					int[] r = st.executeBatch();
					if (isInTransaction == false) {
						conn.commit();
					}
					for (int k=0; k<r.length; k++) {
						result[pointer++] = r[k];
					}
				}
			}
			if (counter != 0) {
				int[] r = st.executeBatch();
				if (isInTransaction == false) {
					conn.commit();
				}
				for (int k = 0; k < r.length; k++) {
					result[pointer++] = r[k];
				}
			}

			return result;
		}
	}

	/**
	 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
	 * <pre>
	 * Example:
	 * int[] result = Db.use().batch(sqlList, 500);
	 * </pre>
	 * @param sqlList The SQL list to execute.
	 * @param batchSize batch size.
	 * @return The number of rows updated per statement
	 */
	public int[] batch(List<String> sqlList, int batchSize) {
		Connection conn = null;
		Boolean autoCommit = null;
		try {
			conn = getConfig().getConnection();
			autoCommit = conn.getAutoCommit();
			conn.setAutoCommit(false);
			return batch(getConfig(), conn, sqlList, batchSize);
		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			if (autoCommit != null) {
				try {
					conn.setAutoCommit(autoCommit);
				} catch (Exception e) {
					LogKit.error(e.getMessage(), e);
				}
			}
			getConfig().close(conn);
		}
	}

	/**
	 * Batch save records using the "insert into ..." sql generated by the first lambkitRecord in recordList.
	 * Ensure all the lambkitRecord can use the same sql as the first lambkitRecord.
	 * @param tableName the table name
	 */
	public int[] batchSave(String tableName, List<? extends M> recordList, int batchSize) {
		if (recordList == null || recordList.size() == 0) {
			return new int[0];
		}

		M lambkitRecord = recordList.get(0);
		Map<String, Object> cols = lambkitRecord.toMap();
		int index = 0;
		StringBuilder columns = new StringBuilder();
		// the same as the iterator in Dialect.forDbSave() to ensure the order of the columns
		for (Map.Entry<String, Object> e : cols.entrySet()) {
			if (getConfig().getDialect().isOracle()) {	// 支持 oracle 自增主键
				Object value = e.getValue();
				if (value instanceof String && ((String)value).endsWith(".nextval")) {
					continue ;
				}
			}

			if (index++ > 0) {
				columns.append(',');
			}
			columns.append(e.getKey());
		}

		String[] pKeysNoUse = new String[0];
		StringBuilder sql = new StringBuilder();
		List<Object> parasNoUse = new ArrayList<Object>();
		getConfig().getDialect().forDbSave(tableName, pKeysNoUse, lambkitRecord, sql, parasNoUse);
		return batch(sql.toString(), columns.toString(), recordList, batchSize);
	}

	/**
	 * Batch update records using the columns names of the first lambkitRecord in recordList.
	 * Ensure all the records can use the same sql as the first lambkitRecord.
	 * @param tableName the table name
	 * @param primaryKey the primary key of the table, composite primary key is separated by comma character: ","
	 */
	public int[] batchUpdate(String tableName, String primaryKey, List<? extends M> recordList, int batchSize) {
		if (recordList == null || recordList.size() == 0) {
			return new int[0];
		}

		String[] pKeys = primaryKey.split(",");
		getConfig().getDialect().trimPrimaryKeys(pKeys);

		M lambkitRecord = recordList.get(0);

		// M 新增支持 modifyFlag

		if (CPI.getModifyFlag(lambkitRecord) == null || CPI.getModifyFlag(lambkitRecord).isEmpty()) {
			return new int[0];
		}
		Set<String> modifyFlag = CPI.getModifyFlag(lambkitRecord);

		Map<String, Object> cols = lambkitRecord.toMap();
		List<String> colNames = new ArrayList<String>();
		// the same as the iterator in Dialect.forDbUpdate() to ensure the order of the columns
		for (Map.Entry<String, Object> e : cols.entrySet()) {
			String col = e.getKey();
			if (modifyFlag.contains(col) && !getConfig().getDialect().isPrimaryKey(col, pKeys)) {
				colNames.add(col);
			}
		}
		for (String pKey : pKeys) {
			colNames.add(pKey);
		}
		String columns = StrUtil.join(",", colNames.toArray(new Object[colNames.size()]));

		Object[] idsNoUse = new Object[pKeys.length];
		StringBuilder sql = new StringBuilder();
		List<Object> parasNoUse = new ArrayList<Object>();
		getConfig().getDialect().forDbUpdate(tableName, pKeys, idsNoUse, lambkitRecord, sql, parasNoUse);
		return batch(sql.toString(), columns, recordList, batchSize);
	}

	/**
	 * Batch update records with default primary key, using the columns names of the first lambkitRecord in recordList.
	 * Ensure all the records can use the same sql as the first lambkitRecord.
	 * @param tableName the table name
	 */
	public int[] batchUpdate(String tableName, List<? extends M> recordList, int batchSize) {
		return batchUpdate(tableName, getPrimaryKey(),recordList, batchSize);
	}

	public String getSql(String key) {
		return getConfig().getSqlKit().getSql(key);
	}

	// 支持传入变量用于 sql 生成。为了避免用户将参数拼接在 sql 中引起 sql 注入风险，只在 SqlKit 中开放该功能
	// public String getSql(String key, Map data) {
	//     return getConfig().getSqlKit().getSql(key, data);
	// }

	public SqlPara getSqlPara(String key, M lambkitRecord) {
		return getSqlPara(key, lambkitRecord.toMap());
	}

	public SqlPara getSqlPara(String key, Model model) {
		return getSqlPara(key, CPI.getAttrs(model));//model._getAttrs()
	}

	public SqlPara getSqlPara(String key, Map data) {
		return getConfig().getSqlKit().getSqlPara(key, data);
	}

	public SqlPara getSqlPara(String key, Object... paras) {
		return getConfig().getSqlKit().getSqlPara(key, paras);
	}

	public SqlPara getSqlParaByString(String content, Map data) {
		return getConfig().getSqlKit().getSqlParaByString(content, data);
	}

	public SqlPara getSqlParaByString(String content, Object... paras) {
		return getConfig().getSqlKit().getSqlParaByString(content, paras);
	}

	public List<M> find(SqlPara sqlPara) {
		return find(sqlPara.getSql(), sqlPara.getPara());
	}

	public M findFirst(SqlPara sqlPara) {
		return findFirst(sqlPara.getSql(), sqlPara.getPara());
	}

	public int update(SqlPara sqlPara) {
		return update(sqlPara.getSql(), sqlPara.getPara());
	}

	public Page<M> paginate(int pageNumber, int pageSize, SqlPara sqlPara) {
		String[] sqls = PageSqlKit.parsePageSql(sqlPara.getSql());
		return doPaginate(pageNumber, pageSize, null, sqls[0], sqls[1], sqlPara.getPara());
	}

	public Page<M> paginate(int pageNumber, int pageSize, boolean isGroupBySql, SqlPara sqlPara) {
		String[] sqls = PageSqlKit.parsePageSql(sqlPara.getSql());
		return doPaginate(pageNumber, pageSize, isGroupBySql, sqls[0], sqls[1], sqlPara.getPara());
	}

	// ---------

	/**
	 * 迭代处理每一个查询出来的 M 对象
	 * <pre>
	 * 例子：
	 * Db.each(lambkitRecord -> {
	 *    // 处理 lambkitRecord 的代码在此
	 *
	 *    // 返回 true 继续循环处理下一条数据，返回 false 立即终止循环
	 *    return true;
	 * }, sql, paras);
	 * </pre>
	 */
	public void each(Function<M, Boolean> func, String sql, Object... paras) {
		Connection conn = null;
		try {
			conn = getConfig().getConnection();

			try (PreparedStatement pst = conn.prepareStatement(sql)) {
				getConfig().getDialect().fillStatement(pst, paras);
				ResultSet rs = pst.executeQuery();
				//getConfig().getDialect().eachRecord(getConfig(), rs, func);
				LambkitPojoBuilder.me.build(getConfig(), rs, getLambkitRecordClass(), getFunction(func));
				close(rs);
			}

		} catch (Exception e) {
			throw new ActiveRecordException(e);
		} finally {
			getConfig().close(conn);
		}
	}

	static final void close(ResultSet rs, Statement st) throws SQLException {
		if (rs != null) {rs.close();}
		if (st != null) {st.close();}
	}

	static final void close(ResultSet rs) throws SQLException {
		if (rs != null) {rs.close();}
	}

	static final void close(Statement st) throws SQLException {
		if (st != null) {st.close();}
	}
}
